home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / p4 / p4-1_2c.lha / p4-1.2c / lib / p4_tsr.c < prev    next >
C/C++ Source or Header  |  1993-07-13  |  14KB  |  547 lines

  1. #include "p4.h"
  2. #include "p4_sys.h"
  3.  
  4. /*
  5.  * search_p4_queue tries to locate a message of the desired type in the
  6.  * local queue of messages already received.  If it finds one, it dequeues it 
  7.  * if deq is true, and returns its address; otherwise it returns NULL.
  8.  */
  9. struct p4_msg *search_p4_queue(req_type, req_from, deq)
  10. int req_type, req_from;
  11. P4BOOL deq;
  12. {
  13.     struct p4_queued_msg *qpp, *qp;
  14.     struct p4_msg *tqp;
  15.     P4BOOL found;
  16.  
  17.     tqp = NULL;
  18.     qpp = NULL;
  19.     found = FALSE;
  20.     qp = p4_local->queued_messages->first_msg;
  21.  
  22.     while (qp && !found)
  23.     {
  24.     if (((qp->qmsg->type == req_type) || (req_type == -1)) &&
  25.         ((qp->qmsg->from == req_from) || (req_from == -1)))
  26.     {
  27.         found = TRUE;
  28.         if (deq)
  29.         {
  30.         if (p4_local->queued_messages->first_msg ==
  31.             p4_local->queued_messages->last_msg)
  32.         {
  33.             p4_local->queued_messages->first_msg = NULL;
  34.             p4_local->queued_messages->last_msg = NULL;
  35.         }
  36.         else
  37.         {
  38.             if (qp == p4_local->queued_messages->first_msg)
  39.             {
  40.             p4_local->queued_messages->first_msg = qp->next;
  41.             }
  42.             else if (qp == p4_local->queued_messages->last_msg)
  43.             {
  44.             p4_local->queued_messages->last_msg = qpp;
  45.             qpp->next = NULL;
  46.             }
  47.             else
  48.             {
  49.             qpp->next = qp->next;
  50.             }
  51.         }
  52.         }
  53.     }
  54.     else
  55.     {
  56.         qpp = qp;
  57.         qp = qp->next;
  58.     }
  59.     }
  60.     if (found)
  61.     {
  62.     p4_dprintfl(30,"extracted queued msg of type %d from %d\n",
  63.             qp->qmsg->type,qp->qmsg->from);
  64.     tqp = qp->qmsg;
  65.     if (deq)
  66.     {
  67.         free_quel(qp);
  68.     }
  69.     }
  70.     return (tqp);
  71. }
  72.  
  73. /*
  74.  * This is the top-level receive routine, called by the user.
  75.  *   req_type is either a desired type or -1.  In the -1 case it will be
  76.  *        modified  by p4_recv to indicate the type actually received.
  77.  *   req_from is either a desired source or -1.  In the -1 case it will be
  78.  *        modified by p4_recv to the source of the message actually received.
  79.  *   msg will be set by p4_recv to point to a buffer containing the message.
  80.  *   len_rcvd will be set by p4_recv to contain the length of the message.
  81.  *
  82.  *   returns 0 if successful; non-zero if error
  83.  */
  84. int p4_recv(req_type, req_from, msg, len_rcvd)
  85. int *req_type, *req_from, *len_rcvd;
  86. char **msg;
  87. {
  88.     struct p4_msg *tmsg;
  89.     P4BOOL good;
  90.  
  91.     p4_dprintfl(20, "receiving for type = %d, sender = %d\n",
  92.         *req_type, *req_from);
  93.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  94.     ALOG_LOG(p4_local->my_id,BEGIN_RECV,*req_from,"");
  95.  
  96.     if (!(tmsg = search_p4_queue(*req_type, *req_from, 1)))
  97.     {
  98.     for (good = FALSE; !good;)
  99.     {
  100.             ALOG_LOG(p4_local->my_id,END_RECV,0,"");
  101.             ALOG_LOG(p4_local->my_id,BEGIN_WAIT,0,"");
  102.             tmsg = recv_message(req_type, req_from);
  103.             ALOG_LOG(p4_local->my_id,END_WAIT,0,"");
  104.             ALOG_LOG(p4_local->my_id,BEGIN_RECV,0,"");
  105.         if (tmsg == NULL)
  106.         {
  107.         p4_dprintf("p4_recv: could not receive a message\n");
  108.         return (-1);
  109.         }
  110.         if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  111.         ((tmsg->from == *req_from) || (*req_from == -1)))
  112.         {
  113.         good = TRUE;
  114.         }
  115.         else
  116.         {
  117.         if (tmsg->ack_req & P4_BROADCAST_MASK)
  118.         {
  119.             if (subtree_broadcast_p4(tmsg->type, tmsg->from,(char *) &tmsg->msg,
  120.                          tmsg->len, tmsg->data_type))
  121.             {
  122.             p4_dprintf("p4_recv: subtree_brdcst failed\n");
  123.             return(-1);
  124.             }
  125.             tmsg->ack_req &= ~P4_BROADCAST_MASK;    /* Unset broadcast bit */
  126.             if (tmsg->from == p4_get_my_id())
  127.             free_p4_msg(tmsg);    /* Don't want broadcast from self */
  128.             else
  129.             queue_p4_message(tmsg, p4_local->queued_messages);
  130.         }
  131.         else
  132.             queue_p4_message(tmsg, p4_local->queued_messages);
  133.         }
  134.     }
  135.     }
  136.  
  137.     if (tmsg->ack_req & P4_BROADCAST_MASK)
  138.     {
  139.     if (subtree_broadcast_p4(tmsg->type, tmsg->from,(char *) &tmsg->msg,
  140.                  tmsg->len, tmsg->data_type))
  141.     {
  142.         p4_dprintf("p4_recv: subtree_brdcst failed\n");
  143.         return(-1);
  144.     }
  145.     tmsg->ack_req &= ~P4_BROADCAST_MASK;    /* Unset broadcast bit */
  146.     }
  147.  
  148.     *req_type = tmsg->type;
  149.     *req_from = tmsg->from;
  150.     p4_dprintfl(10, "received type=%d, from=%d\n",*req_type,*req_from);
  151.     if (*msg == NULL)
  152.     {
  153.     *msg = (char *) &(tmsg->msg);
  154.     *len_rcvd = tmsg->len;
  155.     }
  156.     else
  157.     {
  158.     /* copy into the user's buffer area, truncating if necessary */
  159.     if (tmsg->len < *len_rcvd)
  160.         *len_rcvd = tmsg->len;
  161.     bcopy((char *) &(tmsg->msg),*msg,*len_rcvd);
  162.     free_p4_msg(tmsg);
  163.     }
  164.     ALOG_LOG(p4_local->my_id,END_RECV,*req_from,"");
  165.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  166.  
  167.     return (0);
  168. }
  169.  
  170. struct p4_msg *recv_message(req_type,req_from)
  171. int *req_type, *req_from;
  172. {
  173.     int rc, len;
  174.     struct p4_msg *tmsg;
  175.  
  176. #if  defined(CAN_DO_SOCKET_MSGS) && \
  177.     !defined(CAN_DO_SHMEM_MSGS)  && \
  178.     !defined(CAN_DO_CUBE_MSGS)   && \
  179.     !defined(CAN_DO_SWITCH_MSGS)
  180.  
  181.     return (socket_recv());
  182.  
  183. #else
  184.  
  185.     while (TRUE)
  186.     {
  187. #       if defined(CAN_DO_SHMEM_MSGS)
  188.     if (shmem_msgs_available())
  189.     {
  190.         return (shmem_recv());
  191.     }
  192. #       endif
  193.  
  194. #       if defined(CAN_DO_SOCKET_MSGS)
  195.     if (socket_msgs_available())
  196.     {
  197.         return (socket_recv());
  198.     }
  199. #       endif
  200.  
  201. #       if defined(CAN_DO_CUBE_MSGS)
  202.     if (MD_cube_msgs_available())
  203.         return (MD_cube_recv());
  204. #       endif
  205.  
  206. #       if defined(CAN_DO_SWITCH_MSGS)
  207.     if (p4_global->proctable[p4_local->my_id].switch_port != -1)
  208.     {
  209.         if (rc = sw_probe(req_from, p4_local->my_id, req_type, &len))
  210.         {
  211.         tmsg = alloc_p4_msg(len - sizeof(struct p4_msg) + sizeof(char *));
  212.         sw_recv(rc, tmsg);
  213.         p4_dprintfl(10, "p4_recv: received message from switch\n");
  214.         return (tmsg);
  215.         }
  216.     }
  217. #       endif
  218. #       if defined(CAN_DO_TCMP_MSGS)
  219.     if (MD_tcmp_msgs_available(req_type,req_from))
  220.     {
  221.         return (MD_tcmp_recv());
  222.     }
  223. #       endif
  224.     }
  225.  
  226. #endif
  227. }
  228.  
  229. P4BOOL p4_messages_available(req_type, req_from)
  230. int *req_type, *req_from;
  231. {
  232.     int found, len;
  233.     struct p4_msg *tmsg;
  234.  
  235.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  236.     ALOG_LOG(p4_local->my_id,BEGIN_WAIT,1,"");
  237.  
  238.     found = FALSE;
  239.     if (tmsg = search_p4_queue(*req_type, *req_from, 0))
  240.     {
  241.     found = TRUE;
  242.     *req_type = tmsg->type;
  243.     *req_from = tmsg->from;
  244.     }
  245.  
  246. #   if defined(CAN_DO_SHMEM_MSGS)
  247.     while (!found && shmem_msgs_available())
  248.     {
  249.     tmsg = shmem_recv();
  250.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  251.         ((tmsg->from == *req_from) || (*req_from == -1)))
  252.     {
  253.         found = TRUE;
  254.         *req_type = tmsg->type;
  255.         *req_from = tmsg->from;
  256.     }
  257.     queue_p4_message(tmsg, p4_local->queued_messages);
  258.     }
  259. #   endif
  260.  
  261. #   if defined(CAN_DO_SOCKET_MSGS)
  262.     while (!found && socket_msgs_available())
  263.     {
  264.     tmsg = socket_recv();
  265.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  266.         ((tmsg->from == *req_from) || (*req_from == -1)))
  267.     {
  268.         found = TRUE;
  269.         *req_type = tmsg->type;
  270.         *req_from = tmsg->from;
  271.     }
  272.     queue_p4_message(tmsg, p4_local->queued_messages);
  273.     }
  274. #   endif
  275.  
  276. #   if defined(CAN_DO_CUBE_MSGS)
  277.     while (!found && MD_cube_msgs_available())
  278.     {
  279.     tmsg = MD_cube_recv();
  280.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  281.         ((tmsg->from == *req_from) || (*req_from == -1)))
  282.     {
  283.         found = TRUE;
  284.         *req_type = tmsg->type;
  285.         *req_from = tmsg->from;
  286.     }
  287.     queue_p4_message(tmsg, p4_local->queued_messages);
  288.     }
  289. #   endif
  290.  
  291.  
  292. #if defined(CAN_DO_SWITCH_MSGS)
  293.     if (!found && (p4_global->proctable[p4_local->my_id].switch_port != -1))
  294.     {
  295.     if (sw_probe(req_from, p4_local->my_id, req_type, &len))
  296.         found = TRUE;
  297.     }
  298. #endif
  299.  
  300. #if defined(CAN_DO_TCMP_MSGS)
  301.     if (!found && MD_tcmp_msgs_available(req_from,req_type))
  302.     found = TRUE;
  303. #endif
  304.  
  305.     ALOG_LOG(p4_local->my_id,END_WAIT,1,"");
  306.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  307.  
  308.     return (found);
  309. }                /* p4_messages_available */
  310.  
  311. P4VOID queue_p4_message(msg, hdr)
  312. struct p4_msg *msg;
  313. struct p4_msg_queue *hdr;
  314. {
  315.     struct p4_queued_msg *q;
  316.  
  317.     q = alloc_quel();
  318.     q->qmsg = msg;
  319.     q->next = NULL;
  320.  
  321.     if (hdr->first_msg == NULL)
  322.     {
  323.     hdr->first_msg = q;
  324.     }
  325.     else
  326.     {
  327.     hdr->last_msg->next = q;
  328.     }
  329.     hdr->last_msg = q;
  330.     p4_dprintfl(30,"queued type %d message for process %d quel=%d\n",
  331.         msg->type,msg->to,q);
  332. }
  333.  
  334.  
  335. int send_message(type, from, to, msg, len, data_type, ack_req, p4_buff_ind)
  336. char *msg;
  337. int type, to, len, data_type;
  338. P4BOOL ack_req, p4_buff_ind;
  339. {
  340.     struct p4_msg *tmsg;
  341.     int conntype;
  342.  
  343.     if (to == 0xffff)        /* NCUBE broadcast */
  344.     conntype = CONN_LOCAL;
  345.     else
  346.     conntype = p4_local->conntab[to].type;
  347.  
  348.     p4_dprintfl(90, "send_message: to = %d, conntype=%d conntype=%s\n",
  349.         to, conntype, print_conn_type(conntype));
  350.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  351.     ALOG_LOG(p4_local->my_id,BEGIN_SEND,to,"");
  352.  
  353.     switch (conntype)
  354.     {
  355.       case CONN_ME:
  356.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  357.     p4_dprintfl(20, "sending msg of type %d to myself\n",type);
  358.     queue_p4_message(tmsg, p4_local->queued_messages);
  359.     p4_dprintfl(10, "sent msg of type %d to myself\n",type);
  360.     break;
  361.  
  362. #ifdef CAN_DO_SHMEM_MSGS
  363.       case CONN_SHMEM:
  364.     tmsg = get_tmsg(type, from, to, msg, len, data_type, 
  365.                         ack_req, p4_buff_ind);
  366.     shmem_send(tmsg);
  367.     break;
  368. #endif
  369.  
  370. #ifdef CAN_DO_CUBE_MSGS
  371.       case CONN_CUBE:
  372.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  373.     MD_cube_send(tmsg);
  374.     if (!p4_buff_ind)
  375.         free_p4_msg(tmsg);
  376.     break;
  377. #endif
  378.  
  379. #ifdef CAN_DO_SOCKET_MSGS
  380.       case CONN_REMOTE_NON_EST:
  381.     if (establish_connection(to))
  382.     {
  383.         p4_dprintfl(90, "send_message: conn just estabd to %d\n", to);
  384.     }
  385.     else
  386.     {
  387.         p4_dprintf("send_message: unable to estab conn to %d\n", to);
  388.         ALOG_LOG(p4_local->my_id,END_SEND,to,"");
  389.         ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  390.         return (-1);
  391.     }
  392.     /* no break; - just fall into connected code */
  393.       case CONN_REMOTE_EST:
  394.     if (data_type == P4NOX || p4_local->conntab[to].same_data_rep)
  395.     {
  396.         socket_send(type, from, to, msg, len, data_type, ack_req);
  397.     }
  398.     else
  399.     {
  400. #           ifdef CAN_DO_XDR
  401.         xdr_send(type, from, to, msg, len, data_type, ack_req);
  402. #           else
  403.         p4_error("cannot do xdr sends\n",0);
  404. #           endif
  405.     }
  406.     break;
  407. #endif
  408.  
  409. #if defined(CAN_DO_SWITCH_MSGS)
  410.       case CONN_REMOTE_SWITCH:
  411.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  412.     p4_dprintfl(20, "sending msg of type %d from %d to %d via switch_port %d\n",
  413.                 tmsg->type,tmsg->from,to,p4_local->conntab[tmsg->to].switch_port,tmsg);
  414.     sw_send(from, to,
  415.         p4_local->conntab[tmsg->to].switch_port, tmsg,
  416.         tmsg->len + sizeof(struct p4_msg) - sizeof(char *),
  417.         type);
  418.     p4_dprintfl(10, "sent msg of type %d from %d to %d via switch_port %d\n",
  419.                 tmsg->type,tmsg->from,to,p4_local->conntab[tmsg->to].switch_port,tmsg);
  420.     if (!p4_buff_ind)
  421.         free_p4_msg(tmsg);
  422.     break;
  423. #endif
  424.  
  425. #if defined(CAN_DO_TCMP_MSGS)
  426.       case CONN_TCMP:
  427.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  428.     p4_dprintfl(20, "sending msg of type %d to %d via tcmp\n",type,to);
  429.     MD_tcmp_send(type, from, to, tmsg, 
  430.              tmsg->len + sizeof(struct p4_msg) - sizeof(char *),
  431.              data_type, ack_req);
  432.     p4_dprintfl(10, "sent msg of type %d to %d via tcmp\n",type,to);
  433.     break;
  434. #endif
  435.  
  436.       case CONN_REMOTE_DYING:
  437.     p4_dprintfl(90, "send_message: proc %d is dying\n", to);
  438.     ALOG_LOG(p4_local->my_id,END_SEND,to,"");
  439.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  440.     return (-1);
  441.  
  442.       default:
  443.     p4_dprintf("send_message: to=%d; invalid conn type=%d\n",to,conntype);
  444.     ALOG_LOG(p4_local->my_id,END_SEND,to,"");
  445.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  446.     return (-1);
  447.     }
  448.  
  449.     ALOG_LOG(p4_local->my_id,END_SEND,to,"");
  450.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  451.     return (0);
  452. }                /* send_message */
  453.  
  454. struct p4_msg *get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind)
  455. char *msg;
  456. int type, from, to, len, data_type, ack_req, p4_buff_ind;
  457. {
  458.     int i;
  459.     struct p4_msg *tmsg;
  460.  
  461.     if (p4_buff_ind)
  462.     {
  463.     tmsg = (struct p4_msg *) (msg - sizeof(struct p4_msg) + sizeof(char *));
  464.     }
  465.     else
  466.     {
  467.         tmsg = alloc_p4_msg(len);
  468.     if (tmsg == NULL)
  469.     {
  470.         p4_dprintf("OOPS! get_tmsg: could not alloc buff **\n");
  471.         return (NULL);
  472.     }
  473.     bcopy(msg, (char *) &(tmsg->msg), len);
  474.     }
  475.     tmsg->type = type;
  476.     tmsg->from = from;
  477.     tmsg->to = to;
  478.     tmsg->len = len;
  479.     tmsg->ack_req = ack_req;
  480.     tmsg->data_type = data_type;
  481.     return (tmsg);
  482. }
  483.  
  484.  
  485. char *p4_msg_alloc(msglen)
  486. int msglen;
  487. {
  488.     char *t;
  489.  
  490.     t = (char *) alloc_p4_msg(msglen);
  491.     ((struct p4_msg *) t)->msg_id = -1;    /* msg not in use by IPSC isend */
  492.     t = t + sizeof(struct p4_msg) - sizeof(char *);
  493.     return(t);
  494. }
  495.  
  496. P4VOID p4_msg_free(m)
  497. char *m;
  498. {
  499.     char *t;
  500.  
  501.     t = m - sizeof(struct p4_msg) + sizeof(char *);
  502.     ((struct p4_msg *) t)->msg_id = -1;    /* msg not in use by IPSC isend */
  503.     free_p4_msg((struct p4_msg *) t);
  504. }
  505.  
  506.  
  507. P4VOID initialize_msg_queue(mq)
  508. struct p4_msg_queue *mq;
  509. {
  510.     mq->first_msg = NULL;
  511.     mq->last_msg = NULL;
  512.     (P4VOID) p4_moninit(&(mq->m), 1);
  513.     p4_lock_init(&(mq->ack_lock));
  514.     p4_lock(&(mq->ack_lock));
  515. }
  516.  
  517.  
  518. struct p4_queued_msg *alloc_quel()
  519. {
  520.     struct p4_queued_msg *q;
  521.  
  522.     p4_lock(&p4_global->avail_quel_lock);
  523.     if (p4_global->avail_quel == NULL)
  524.     {
  525.     q = (struct p4_queued_msg *) p4_shmalloc(sizeof(struct p4_queued_msg));
  526.     if (!q)
  527.         p4_error("alloc_quel:  could not allocate queue element",
  528.              sizeof(struct p4_queued_msg));
  529.     }
  530.     else
  531.     {
  532.     q = p4_global->avail_quel;
  533.     p4_global->avail_quel = q->next;
  534.     }
  535.     p4_unlock(&p4_global->avail_quel_lock);
  536.     return (q);
  537. }
  538.  
  539. P4VOID free_quel(q)
  540. struct p4_queued_msg *q;
  541. {
  542.     p4_lock(&p4_global->avail_quel_lock);
  543.     q->next = p4_global->avail_quel;
  544.     p4_global->avail_quel = q;
  545.     p4_unlock(&p4_global->avail_quel_lock);
  546. }
  547.